/**
 * FileName: StructuredStreaming
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/03/21 11:26
 * Description: 数据源设为
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.factory.SourceFactory;
import cn.com.bonc.process.chain.ProcessChain;
import cn.com.bonc.process.impl.JoinExternalDataProcessImpl;
import cn.com.bonc.process.impl.MutiCols2ColProcessImpl;
import cn.com.bonc.process.impl.RuleActionProcessImpl;
import cn.com.bonc.process.impl.SqlListProcessImpl;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;


public class StructuredStreamingJoin {

	private static final String SAVE_PATH = "hdfs://192.168.70.21:9000/test_szj/parquet9";

	public static void main(String[] args) {

		String sourceName="";
		if (args.length==1){
			sourceName =args[0].toUpperCase();
			if (!(sourceName.equals(Constants.HDFS_SOURCE)||sourceName.equals(Constants.KAFKA_SOURCE))){
				System.err.println("No data source selected !(hdfs/kafka)");
				System.exit(1);
			}
		}else {
			System.err.println("No data source selected !(hdfs/kafka)");
			System.exit(1);
		}

		SparkSession spark = SparkSession
				.builder()
				.appName("JoinProcessApp")
				.getOrCreate();

		Dataset dataset = SourceFactory.create(sourceName).getDataset(spark);
		Dataset<Row> result = ProcessChain.setSourceData(dataset)
				.addProcess(new RuleActionProcessImpl())
				.addProcess(new JoinExternalDataProcessImpl(spark))
				.addProcess(new SqlListProcessImpl(spark))
				.addProcess(new MutiCols2ColProcessImpl())
				.execute();

		DataStreamWriter<Row> rowDataStreamWriter = result
				.writeStream();
		if (sourceName.equals(Constants.KAFKA_SOURCE)){
			rowDataStreamWriter = rowDataStreamWriter
					.outputMode(OutputMode.Update())
					.format("kafka")
					.option("checkpointLocation", "hdfs://192.168.70.21:9000/test_szj/checkpoint")
					.option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
					.option("topic", "savetopic");
		}
		rowDataStreamWriter
				.outputMode(OutputMode.Append())
				.format("text")
				.option("checkpointLocation", SAVE_PATH)
				.option("path", SAVE_PATH);

		StreamingQuery query=rowDataStreamWriter.start();

		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}
}